package com.migrate.module.migrate;


import com.migrate.module.domain.BinlogData;
import com.migrate.module.domain.EtlBinlogConsumeRecord;
import com.migrate.module.enumeration.OperateType;
import com.migrate.module.lock.PutBinlogLock;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

/**
 * 数据缓存阻塞队列类
 *
 * @author zhonghuashishan
 */
@Slf4j
public class LocalQueue {

    private static volatile LocalQueue localQueue;

    /**
     * 数据同步的写队列
     */
    private  volatile LinkedList<BinlogData> writeQueue = new LinkedList<>();
    /**
     *  数据同步的 读队列
     */
    private  volatile LinkedList<BinlogData> readQueue = new LinkedList<>();
    /**
     * 提供锁的实例对象
     */
    private  final PutBinlogLock lock = new PutBinlogLock();
    /**
     * 是否正在读取数据
     */
    private volatile boolean isRead = false;



    private LocalQueue(){

    }

    /**
     * 构建一个单例模式对象
     * @return LocalQueue实例
     */
    public static  LocalQueue getInstance(){
        if (null == localQueue){
            synchronized (LocalQueue.class){
                if (null == localQueue){
                    localQueue = new LocalQueue();
                }
            }
        }
        return localQueue;
    }

    /**
     * 数据写入队列
     * @param binlogData MySQL的binlog对象
     * @param consumeRecord 消费记录
     */
    public void submit(BinlogData binlogData, EtlBinlogConsumeRecord consumeRecord) {
        lock.lock();
        try {
            binlogData.setConsumeRecord(consumeRecord);
            writeQueue.add(binlogData);
        } finally {
            lock.unlock();
        }
    }

    /**
     *  交换队列
     */
    private void swapRequests() {
        lock.lock();
        try {
            log.info("本次同步数据写入："+writeQueue.size()+"条数");
            LinkedList<BinlogData> tmp = writeQueue;
            writeQueue = readQueue;
            readQueue = tmp;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 将读队列缓存的数据，进行数据合并处理，并写入存储落地
     */
    public void doCommit() {
        // 标记目前正在读取数据同步落地
        isRead = true;
        // 读取数据，并写入完成以后，交互一下读 写队列
        swapRequests();
        if (!readQueue.isEmpty()) {
            MergeBinlogWrite mergeBinlogWrite = new MergeBinlogWrite();
            // 遍历存储在读队列的数据，进行数据合并，保留时间最新的操作
            for (BinlogData binlogData : readQueue) {
                // 对数据进行合并处理
                mergeBinlogWrite.mergeBinlog(binlogData);
            }
            // 对数据进行核验，过滤 无效的数据，例如已经小于新库记录时间的
            mergeBinlogWrite.filterBinlogAging();
            // 数据写入，按表分组写入
            mergeBinlogWrite.write(OperateType.ADD);
        }
        readQueue.clear();
        isRead = false;
    }

    /**
     *  获取是否正在读取数据解析落地
     * @return 是否正在读取
     */
    public Boolean getIsRead(){
        return this.isRead;
    }
}
